001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.hadoop.record;
020
021import java.io.InputStreamReader;
022import java.io.InputStream;
023import java.io.IOException;
024import java.io.PushbackReader;
025import java.io.UnsupportedEncodingException;
026
027import org.apache.hadoop.classification.InterfaceAudience;
028import org.apache.hadoop.classification.InterfaceStability;
029
030/**
031 * @deprecated Replaced by <a href="http://hadoop.apache.org/avro/">Avro</a>.
032 */
033@Deprecated
034@InterfaceAudience.Public
035@InterfaceStability.Stable
036public class CsvRecordInput implements RecordInput {
037    
038  private PushbackReader stream;
039    
040  private class CsvIndex implements Index {
041    @Override
042    public boolean done() {
043      char c = '\0';
044      try {
045        c = (char) stream.read();
046        stream.unread(c);
047      } catch (IOException ex) {
048      }
049      return (c == '}') ? true : false;
050    }
051    @Override
052    public void incr() {}
053  }
054    
055  private void throwExceptionOnError(String tag) throws IOException {
056    throw new IOException("Error deserializing "+tag);
057  }
058    
059  private String readField(String tag) throws IOException {
060    try {
061      StringBuilder buf = new StringBuilder();
062      while (true) {
063        char c = (char) stream.read();
064        switch (c) {
065        case ',':
066          return buf.toString();
067        case '}':
068        case '\n':
069        case '\r':
070          stream.unread(c);
071          return buf.toString();
072        default:
073          buf.append(c);
074        }
075      }
076    } catch (IOException ex) {
077      throw new IOException("Error reading "+tag);
078    }
079  }
080    
081  /** Creates a new instance of CsvRecordInput */
082  public CsvRecordInput(InputStream in) {
083    try {
084      stream = new PushbackReader(new InputStreamReader(in, "UTF-8"));
085    } catch (UnsupportedEncodingException ex) {
086      throw new RuntimeException(ex);
087    }
088  }
089    
090  @Override
091  public byte readByte(String tag) throws IOException {
092    return (byte) readLong(tag);
093  }
094    
095  @Override
096  public boolean readBool(String tag) throws IOException {
097    String sval = readField(tag);
098    return "T".equals(sval) ? true : false;
099  }
100    
101  @Override
102  public int readInt(String tag) throws IOException {
103    return (int) readLong(tag);
104  }
105    
106  @Override
107  public long readLong(String tag) throws IOException {
108    String sval = readField(tag);
109    try {
110      long lval = Long.parseLong(sval);
111      return lval;
112    } catch (NumberFormatException ex) {
113      throw new IOException("Error deserializing "+tag);
114    }
115  }
116    
117  @Override
118  public float readFloat(String tag) throws IOException {
119    return (float) readDouble(tag);
120  }
121    
122  @Override
123  public double readDouble(String tag) throws IOException {
124    String sval = readField(tag);
125    try {
126      double dval = Double.parseDouble(sval);
127      return dval;
128    } catch (NumberFormatException ex) {
129      throw new IOException("Error deserializing "+tag);
130    }
131  }
132    
133  @Override
134  public String readString(String tag) throws IOException {
135    String sval = readField(tag);
136    return Utils.fromCSVString(sval);
137  }
138    
139  @Override
140  public Buffer readBuffer(String tag) throws IOException {
141    String sval = readField(tag);
142    return Utils.fromCSVBuffer(sval);
143  }
144    
145  @Override
146  public void startRecord(String tag) throws IOException {
147    if (tag != null && !tag.isEmpty()) {
148      char c1 = (char) stream.read();
149      char c2 = (char) stream.read();
150      if (c1 != 's' || c2 != '{') {
151        throw new IOException("Error deserializing "+tag);
152      }
153    }
154  }
155    
156  @Override
157  public void endRecord(String tag) throws IOException {
158    char c = (char) stream.read();
159    if (tag == null || tag.isEmpty()) {
160      if (c != '\n' && c != '\r') {
161        throw new IOException("Error deserializing record.");
162      } else {
163        return;
164      }
165    }
166        
167    if (c != '}') {
168      throw new IOException("Error deserializing "+tag);
169    }
170    c = (char) stream.read();
171    if (c != ',') {
172      stream.unread(c);
173    }
174        
175    return;
176  }
177    
178  @Override
179  public Index startVector(String tag) throws IOException {
180    char c1 = (char) stream.read();
181    char c2 = (char) stream.read();
182    if (c1 != 'v' || c2 != '{') {
183      throw new IOException("Error deserializing "+tag);
184    }
185    return new CsvIndex();
186  }
187    
188  @Override
189  public void endVector(String tag) throws IOException {
190    char c = (char) stream.read();
191    if (c != '}') {
192      throw new IOException("Error deserializing "+tag);
193    }
194    c = (char) stream.read();
195    if (c != ',') {
196      stream.unread(c);
197    }
198    return;
199  }
200    
201  @Override
202  public Index startMap(String tag) throws IOException {
203    char c1 = (char) stream.read();
204    char c2 = (char) stream.read();
205    if (c1 != 'm' || c2 != '{') {
206      throw new IOException("Error deserializing "+tag);
207    }
208    return new CsvIndex();
209  }
210    
211  @Override
212  public void endMap(String tag) throws IOException {
213    char c = (char) stream.read();
214    if (c != '}') {
215      throw new IOException("Error deserializing "+tag);
216    }
217    c = (char) stream.read();
218    if (c != ',') {
219      stream.unread(c);
220    }
221    return;
222  }
223}